package com.retry.task.core.executor.impl;

import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.retry.task.core.config.RetryTaskConsumeHolder;
import com.retry.task.core.constants.RetryTaskStatusEnum;
import com.retry.task.core.constants.RetryTaskURIEnum;
import com.retry.task.core.exception.ExceptionCode;
import com.retry.task.core.executor.AbstractExecutor;
import com.retry.task.core.heart.HeartTickDO;
import com.retry.task.core.model.*;
import com.retry.task.core.model.base.Result;
import com.retry.task.core.notify.NotifyFactory;
import com.retry.task.core.task.ConsumeStatus;
import com.retry.task.core.task.RetryTaskFactory;
import com.retry.task.core.task.interfaces.AbstractRetryTaskListener;
import com.retry.task.core.task.interfaces.IRetryTaskListener;
import com.retry.task.core.threadlocal.RetryTaskThreadLocal;
import com.retry.task.core.utils.ExceptionUtils;
import com.retry.task.core.utils.GsonTool;
import com.retry.task.core.utils.HttpClientUtils;
import com.retry.task.core.utils.IpUtils;
import com.retry.task.core.utils.threadPool.RetryTaskLogRunable;
import com.retry.task.core.utils.threadPool.RetryTaskThreadPoolExector;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/**
 * @author gao.gwq
 * @version 1.0
 * @date 2022/4/29  17:09
 * @Description TODO
 */
public class ClientExecutor extends AbstractExecutor {

    private static final Logger LOGGER = LoggerFactory.getLogger(ClientExecutor.class);

    private static RetryTaskThreadPoolExector taskThreadPoolExector = new RetryTaskThreadPoolExector(
        8,
        200,
        60L,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>(),
        new BasicThreadFactory.Builder().namingPattern("retry-task-client-process").build(),
        new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                throw new RuntimeException("RetryTaskServer bizThreadPool is EXHAUSTED!");
            }
        });

    static {
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                taskThreadPoolExector.shutdown();
                LOGGER.info("-------------retry-task shutdown netty client router  thread pool ");
            }
        }));
    }

    @Override
    public Result<String> heart(HeartDTO heartDTO) {
        return null;
    }

    @Override
    public Result<String> run(final RetryTaskContext context) {
        Result<HeartTickDO> heartTickResult = RetryTaskFactory.getInstance().getHeartTickDO();
        if (!heartTickResult.isSuccess()) {
            return Result.getFail(ExceptionCode.CAN_NOT_FIND_HEART_TICK, "retry-task can not find heartTick ");
        }
        Runnable logRun = new RetryTaskLogRunable<>(MDC.getCopyOfContextMap(), new Runnable() {
            @Override
            public void run() {
                try {
                    AttributeModel attributeModel = context.getAttributeModel();
                    RetryTaskThreadLocal.setAttributeModelThreadLocal(attributeModel);
                    doRunRetryTask(context, heartTickResult.getData());
                } finally {
                    RetryTaskThreadLocal.removeAttribute();
                }

            }
        });
        taskThreadPoolExector.execute(logRun);
        return Result.getSuccess(Result.SUCESS);
    }

    private Result<String> doRunRetryTask(RetryTaskContext context, HeartTickDO heartTickDO) {

        String code = updateLogStartTimeAndTaskInfo(context, heartTickDO);
        if (StringUtils.equals(code, "200")) {
            LOGGER.info("retry task is already consume ,msg {}", GsonTool.toJsonStringIgnoreNull(context));
            return Result.getFail("100", "retry task is already consume");
        }

        String key = context.getProjectName() + "#" + context.getTaskName();
        IRetryTaskListener iRetryTaskListener = RetryTaskConsumeHolder.getRetryTaskListener(key);
        if (iRetryTaskListener == null) {
            LOGGER.warn("retry-task can not find consumer by key {}", key);
            updateRetryTaskLog(context, heartTickDO, RetryTaskStatusEnum.FAIL,
                "can not  find task name! taskName is " + key);
            return Result.getFail("100", "can find consumer");
        }
        doConsume(context, iRetryTaskListener);
        return Result.getSuccess(Result.SUCESS);
    }

    /**
     * @param retryTaskContext
     * @param retryTaskListener
     */
    private void doConsume(RetryTaskContext retryTaskContext,
        IRetryTaskListener retryTaskListener) {
        String errMsg = null;
        RetryTaskStatusEnum statusEnum = RetryTaskStatusEnum.SUCESS;
        try {
            ConsumeStatus taskStatus;
            if (retryTaskListener instanceof AbstractRetryTaskListener) {
                AbstractRetryTaskListener taskListener = (AbstractRetryTaskListener)retryTaskListener;
                taskStatus = taskListener.consumeContext(retryTaskContext);
            } else {
                taskStatus = retryTaskListener.consume(retryTaskContext);
            }
            if (taskStatus != null && taskStatus.equals(ConsumeStatus.FAIL)) {
                statusEnum = RetryTaskStatusEnum.FAIL;
                errMsg = "执行失败";
            }

        } catch (Throwable throwable) {
            LOGGER.warn("consume retry task exception", throwable);
            errMsg = ExceptionUtils.createStackTrackMessage(throwable);
            statusEnum = RetryTaskStatusEnum.FAIL;
        } finally {
            Result<HeartTickDO> heartTickResult = RetryTaskFactory.getInstance().getHeartTickDO();
            if (!heartTickResult.isSuccess()) {
                return;
            }
            RetryTaskLogDTO retryTaskLogDTO = updateRetryTaskLog(retryTaskContext, heartTickResult.getData(),
                statusEnum, errMsg);
            NotifyFactory.doNotify(retryTaskContext, retryTaskLogDTO);
        }
    }

    /**
     * 更新日志结束时间
     *
     * @param retryTaskContext
     * @param heartTickDO
     * @param statusEnum
     * @param errMsg
     */
    private RetryTaskLogDTO updateRetryTaskLog(RetryTaskContext retryTaskContext, HeartTickDO heartTickDO,
        RetryTaskStatusEnum statusEnum, String errMsg) {
        RetryTaskLogDTO retryTaskLogDTO = new RetryTaskLogDTO();
        try {
            setBasicLogInfo(retryTaskLogDTO, retryTaskContext);
            retryTaskLogDTO.setToken(retryTaskContext.getToken());

            retryTaskLogDTO.setEndTime(new Date());
            retryTaskLogDTO.setErrorMessage(errMsg);
            retryTaskLogDTO.setStatus(statusEnum.getCode());
            retryTaskLogDTO.setType(1);
            String realUri = heartTickDO.getServerIp() + RetryTaskURIEnum.UPDATE_RETRY_TASK_LOG.getUri();
            Result resultDTO = HttpClientUtils.postBody(realUri, retryTaskLogDTO,
                String.class);
            if (!resultDTO.isSuccess()) {
                LOGGER.warn("update retry task log error,{}", resultDTO.getErrMsg());
            }
        } catch (Exception ex) {
            LOGGER.warn("update retry task log exception", ex);
        }
        return retryTaskLogDTO;
    }

    /**
     * 更新日志开始时间
     *
     * @param retryTaskContext
     * @param heartTickDO
     * @return
     */
    private String updateLogStartTimeAndTaskInfo(RetryTaskContext retryTaskContext, HeartTickDO heartTickDO) {
        try {
            RetryTaskLogDTO retryTaskLogDTO = new RetryTaskLogDTO();
            setBasicLogInfo(retryTaskLogDTO, retryTaskContext);

            retryTaskLogDTO.setStartTime(new Date());
            retryTaskLogDTO.setType(0);
            retryTaskLogDTO.setStatus(RetryTaskStatusEnum.PROGRESS.getCode());
            retryTaskLogDTO.setExecuteIp(IpUtils.getIp());
            String realUri = heartTickDO.getServerIp() + RetryTaskURIEnum.UPDATE_RETRY_TASK_LOG.getUri();
            heartTickDO.setLastEndHeartTime(new Date());
            Result resultDTO = HttpClientUtils.postBody(realUri, retryTaskLogDTO,
                String.class);
            if (!resultDTO.isSuccess()) {
                LOGGER.warn("create retry task log error,{}", resultDTO.getErrMsg());
                return resultDTO.getCode();
            }
            return "0";
        } catch (Exception ex) {
            LOGGER.warn("create retry task exception", ex);
            return "0";
        }
    }

    private void setBasicLogInfo(RetryTaskLogDTO retryTaskLogDTO, RetryTaskContext retryTaskContext) {
        retryTaskLogDTO.setTaskId(retryTaskContext.getTaskId());
        retryTaskLogDTO.setId(retryTaskContext.getInstanceId());
        retryTaskLogDTO.setIsTest(retryTaskContext.getIsTest());
        retryTaskLogDTO.setProjectName(retryTaskContext.getProjectName());
        retryTaskLogDTO.setTaskName(retryTaskContext.getTaskName());
        retryTaskLogDTO.setToken(retryTaskContext.getToken());
    }

    @Override
    public Result<String> killTask() {
        return null;
    }

    @Override
    public Result<String> callBack() {
        return null;
    }

    @Override
    public Result<String> createSchedulerTask(RetryTaskDTO retryTaskDTO) {
        return null;
    }

}
